[アップデート]AWS IoT Coreでモノの属性をメッセージに伝播させる機能が追加されました!
製造ビジネステクノロジー部の新澤です。
先日 AWS IoT Core のモノに定義された属性値をMQTT5の機能であるユーザープロパティに自動的に付与することができる機能の追加が発表されました。
概要
AWS IoT Core の「モノ」には、従来より任意の属性値を付与することができましたが、モノを検索する場合に用いる場合など用途が限られていました。
今回のアップデートでは、「モノのタイプ」に「伝播属性」という新たな設定が加わり、モノのタイプが適用されたモノから発行されたメッセージに伝播属性に設定した値をMQTT5メッセージのユーザープロパティに自動的に付与することができるようになりました。
これにより、ルールでユーザープロパティに伝播された属性値をメッセージのペイロードに追加したり、条件に一致したメッセージのみ処理対象にするといったようなことが可能になります。
それでは、実際に動作を確認してみましょう。
モノの属性値を確認する
AWS IoT Coreの管理コンソールからモノの属性値を確認します。
ここでは、モノのタイプ"vehicle"に関連付けられたモノ"vehicle-001"を例にしています。
このモノは、モノのタイプから3つの検索可能な属性("vehicleId", "model", "manufacturer")と独自に定義された2つの属性("Door", "Color")を持っています。
モノのタイプに伝播属性を追加する
「伝播属性」はモノのタイプに定義しますので、ここではモノのタイプ"vehicle"の編集画面を開いてみます。
今回のアップデートで、追加設定に「伝播属性」の項目が追加されています。
伝播属性には「接続属性」と「モノの属性」の2種類があります。
接続属性は、「クライアントID」「モノの名前」の2種類を指定することが可能で、もう一つの「モノの属性」は、モノに定義されている属性を指定することができます。
いずれも指定した値をメッセージのユーザープロパティにクライアントID・モノの名前を追加することができます。
また、「モノの名前」のほうは「ユーザープロパティキー」を指定することで、ユーザープロパティに追加する際に属性の名前と違う名前に変更して付加することができるようになっています。
今回は、「接続属性」に「クライアントID」、「モノの属性」に「Color」を追加してみました。
ルールを編集する
ペイロードの全ての値を取得するSQLが設定されたルールを修正して、追加した伝播属性をペイロードに含めるように変更してみます。
ここでは、モノの属性"Color"をペイロードに追加するように変更してみます。
なお、このルールにはトピックに発行されたメッセージのペイロードをそのままログに出力するシンプルなLambda関数が設定されています。
クライアントからメッセージを発行してみる
MQTT5 クライアントからメッセージを送信して動作確認してみます。
クライアントのコードは以下のような接続後に1回メッセージを送信するだけのものです。
from awsiot import mqtt5_client_builder
from awscrt import mqtt5
from concurrent.futures import Future
import time
import json
TIMEOUT = 100
client_id = "vehicle-001"
message_topic = "vehicle/test"
endpoint = "xxxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com"
cert_path = f"certs/{client_id}-certificate.pem.crt"
private_key_path = f"certs/{client_id}-private.pem.key"
root_ca_path = "./AmazonRootCA1.pem"
message_count = 1
data = {"message": "Hello, MQTT!"}
future_stopped = Future()
future_connection_success = Future()
# Callback for the lifecycle event Stopped
def on_lifecycle_stopped(lifecycle_stopped_data: mqtt5.LifecycleStoppedData):
print("Lifecycle Stopped")
global future_stopped
future_stopped.set_result(lifecycle_stopped_data)
# Callback for the lifecycle event Connection Success
def on_lifecycle_connection_success(
lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData,
):
print("Lifecycle Connection Success")
global future_connection_success
future_connection_success.set_result(lifecycle_connect_success_data)
# Callback for the lifecycle event Connection Failure
def on_lifecycle_connection_failure(
lifecycle_connection_failure: mqtt5.LifecycleConnectFailureData,
):
print("Lifecycle Connection Failure")
print(
"Connection failed with exception:{}".format(
lifecycle_connection_failure.exception
)
)
if __name__ == "__main__":
print("\nStarting MQTT5 Publish Sample\n")
# プロキシオプションの作成
proxy_options = None
# プロキシ設定が必要な場合は適宜設定
# MQTT5クライアントの作成
client = mqtt5_client_builder.mtls_from_path(
endpoint=endpoint,
port=8883, # 必要に応じて変更
cert_filepath=cert_path,
pri_key_filepath=private_key_path,
ca_filepath=root_ca_path,
http_proxy_options=proxy_options,
on_lifecycle_stopped=on_lifecycle_stopped,
on_lifecycle_connection_success=on_lifecycle_connection_success,
on_lifecycle_connection_failure=on_lifecycle_connection_failure,
client_id=client_id,
)
print("MQTT5 Client Created")
print(f"Connecting to {endpoint} with client ID '{client_id}'...")
client.start()
lifecycle_connect_success_data = future_connection_success.result(TIMEOUT)
connack_packet = lifecycle_connect_success_data.connack_packet
print(f"Connected to endpoint: '{endpoint}' with Client ID: '{client_id}'")
if data:
print("Sending {} message(s)".format(message_count))
for publish_count in range(1, message_count + 1):
message = "{} [{}]".format(json.dumps(data), publish_count)
print("Publishing message to topic '{}': {}".format(message_topic, message))
publish_future = client.publish(
mqtt5.PublishPacket(
topic=message_topic,
payload=json.dumps(data),
qos=mqtt5.QoS.AT_LEAST_ONCE,
)
)
publish_completion_data = publish_future.result(TIMEOUT)
print(
"PubAck received with {}".format(
repr(publish_completion_data.puback.reason_code)
)
)
time.sleep(1)
print("Stopping Client")
client.stop()
future_stopped.result(TIMEOUT)
print("Client Stopped!")
管理コンソールで MQTT テストクライアントを起動し、メッセージ送信先のトピックのサブスクライブを実行します。
ここでは "vehicle/test" としています。
次に上記クライアントコードを実行します。
python mqtt_client.py
Starting MQTT5 Publish Sample
MQTT5 Client Created
Connecting to xxxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com with client ID 'vehicle-001'...
Lifecycle Connection Success
Connected to endpoint: 'xxxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com' with Client ID: 'vehicle-001'
Sending 1 message(s)
Publishing message to topic 'vehicle/test': {"message": "Hello, MQTT!"} [1]
PubAck received with <PubackReasonCode.SUCCESS: 0>
Stopping Client
Lifecycle Stopped
Client Stopped!
設定した値がメッセージのユーザープロパティに追加されているのが確認できました。
ルールで実行された Lambda 関数のログも確認してみます。
ルールで指定した通り、伝播属性"Color"がメッセージのペイロードに入っていました。
なお、get_user_properties()関数で取り出した値は、配列になります。
最後に
今回のアップデートで、デバイスに手を加えずにデバイスの情報をペイロードに付加したり、条件によって処理を振り分けたりできるようになりました。
デバイスを変更せず、送信されるMQTTメッセージのサイズも増やすことなくデバイスの情報をメッセージに追加できるというのは、かなり使い所が多いのではないでしょうか。